package com.paul.mq.dependencies.mqtt.emqx;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;

/**
 * @author zmk
 * @date 2021/1/11 11:01
 * @description
 */
//@Component
public class MqttConsumer implements ApplicationRunner {

    private static final Logger log = LoggerFactory.getLogger(MqttConsumer.class);

    //多个topic，使用, 隔开
    static String subTopic = "testtopic/p2p/GID_PAUL_COM/@@@/#";
    static String pubTopic = "testtopic/p2p/GID_PAUL_COM/@@@/AKSEC17F15CD41F";
    String content = "Hello World";
    static int initQos = 1;
    String broker = "tcp://47.100.210.120:1883";
    String clientId = "server";

    String userName = "admin";
    String passWord = "kj123456!@#";

    int outTime = 10;
    int keepTime = 20;

    static MqttClient mqttClient;
    static MqttConnectOptions connOpts;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("初始化并启动mqtt......");
        connect();
    }


    private void connect() {
        try {
            // 1 创建客户端
            getClient();
            // 2 设置配置
            getOption();
            // 3 消息发布质量
            String[] topic = subTopic.split(",");
            int[] qos = getQos(topic.length);
            // 4 最后设置
            create(connOpts, topic, qos);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建客户端
     */
    public void getClient() {
        try {
            if (null == mqttClient) {
                //host为主机名，clientid即连接MQTT的客户端ID，一般以唯一标识符表示，
                // MemoryPersistence设置clientid的保存形式，默认为以内存保存
                mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            }
            log.info("--创建mqtt客户端");
        } catch (Exception e) {
            log.error("创建mqtt客户端异常：" + e);
        }
    }

    /**
     * 生成配置对象，用户名，密码等
     */
    private void getOption() {
        //MQTT连接设置
        connOpts = new MqttConnectOptions();
        //设置是否清空session,false表示服务器会保留客户端的连接记录，true表示每次连接到服务器都以新的身份连接
        connOpts.setCleanSession(true);
        //设置连接的用户名
        connOpts.setUserName(userName);
        //设置连接的密码
        connOpts.setPassword(passWord.toCharArray());
        //设置超时时间 单位为秒
        connOpts.setConnectionTimeout(outTime);
        //设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线，但这个方法并没有重连的机制
        connOpts.setKeepAliveInterval(keepTime);
        connOpts.setAutomaticReconnect(true);
        //setWill方法，如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
//            option.setWill(topic, "close".getBytes(), 2, true);
    }

    /**
     * qos
     */
    public int[] getQos(int length) {

        int[] qos = new int[length];
        for (int i = 0; i < length; i++) {
            /**
             *  MQTT协议中有三种消息发布服务质量:
             *
             * QOS0： “至多一次”，消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况，环境传感器数据，丢失一次读记录无所谓，因为不久后还会有第二次发送。
             * QOS1： “至少一次”，确保消息到达，但消息重复可能会发生。
             * QOS2： “只有一次”，确保消息到达一次。这一级别可用于如下情况，在计费系统中，消息重复或丢失会导致不正确的结果，资源开销大
             */
            qos[i] = initQos;
        }
        log.info("--设置消息发布质量");
        return qos;
    }

    /**
     * 装在各种实例和订阅主题
     */
    public void create(MqttConnectOptions options, String[] topic, int[] qos) {
        try {
            mqttClient.setCallback(new PushCallBack(mqttClient, options, topic, qos));
            log.info("--添加回调处理类");
            mqttClient.connect(options);
        } catch (Exception e) {
            log.info("装载实例或订阅主题异常：" + e);
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            log.info("topic:" + topic);
            mqttClient.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布，非持久化
     * qos根据文档设置为1
     *
     * @param topic
     * @param msg
     */
    public static void publish(String topic, String msg) {
        if (connOpts.getMaxInflight() - mqttClient.getPendingDeliveryTokens().length > 0) {

        }
        publish(1, false, topic, msg);
    }

    /**
     * 发布
     */
    public static void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = mqttClient.getTopic(topic);
        if (null == mTopic) {
            log.error("topic：" + topic + " 不存在");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();

            if (!token.isComplete()) {
                log.info("消息发送成功");
            }
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    public class PushCallBack implements MqttCallbackExtended {
        private MqttClient client;
        private MqttConnectOptions options;
        private String[] topic;
        private int[] qos;

        public PushCallBack(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
            this.client = client;
            this.options = options;
            this.topic = topic;
            this.qos = qos;
        }

        /**
         * 断开重连
         */
        @Override
        public void connectionLost(Throwable cause) {
            log.info("MQTT连接断开，发起重连......");
            try {
                if (null != client && !client.isConnected()) {
                    client.reconnect();
                    log.error("尝试重新连接");
                } else {
                    client.connect(options);
                    log.error("尝试建立新连接");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        /**
         * 消息处理
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            String msg = new String(message.getPayload(), StandardCharsets.UTF_8);
            log.info("收到topic:" + topic + " 消息：" + msg);
            try {
                if (msg.equals("11111")) {

                }
//                        consumerMessageManager.doOnMessage(result);
                publish(pubTopic, clientId + content + new Random().nextInt());
            } catch (Exception e) {
                log.info("处理mqtt消息异常:" + e);
            }
        }

        /**
         * 接收到消息调用令牌中调用
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("deliveryComplete---------" + token.isComplete());
            log.info("deliveryComplete---------" + Arrays.toString(topic));
        }

        /**
         * mqtt连接后订阅主题
         */
        @Override
        public void connectComplete(boolean b, String s) {
            try {
                if (null != topic && null != qos) {
                    if (client.isConnected()) {
                        client.subscribe(topic, qos);
                        log.info("mqtt连接成功，客户端ID：" + clientId);
                        log.info("--订阅主题:：" + Arrays.toString(topic));
                    } else {
                        log.info("mqtt连接失败，客户端ID：" + clientId);
                    }
                }
            } catch (Exception e) {
                log.info("mqtt订阅主题异常:" + e);
            }
        }
    }
}
